package com.offcn.bigdata.spark.p2

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
  * SparkRDD算子操作之transformation
  *
  * map
  * flatMap
  * filter；
  *
  * mapPartitions
  * mapPartitionWithIndex
  * sample
  * union
  * distinct
  * join
  * coalesce
  * repartition
  * groupByKey
  * reduceByKey
  * foldByKey
  * combineByKey
  * aggregateByKey
  */
object _01TransformationOps {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
                    .setAppName("TransformationOps")
                    .setMaster("local[*]")
        val sc = new SparkContext(conf)

//        mapPartitionsOps(sc)
//        mapPartitionWithIndex(sc)
//        sampleOps(sc)
//        unionOps(sc)
//        distinctOps(sc)
//        joinOps(sc)
//        repartitionOps(sc)
//        groupByKeyOps(sc)
        reduceByKeyOps(sc)
        sc.stop()
    }

    /**
      * reduceByKey和foldByKey都是按照key对value进行聚合操作
      * 其区别和scala中reduce与fold的区别是一模一样，取决于reduce的初始化值是集合中的第一个元素，fold的初始化值需要自定义
      */
    def reduceByKeyOps(sc: SparkContext): Unit = {
        //wordcount
        val lines = sc.parallelize(List(
            "hello you",
            "hello me",
            "hello lan lan"
        ))

        val pairs = lines.flatMap(_.split("\\s+")).map((_, 1))
        var ret = pairs.reduceByKey(_+_)
        println("reduceByKey的操作")
        ret.foreach(println)
        println("foldByKey的操作")
        ret = pairs.foldByKey(0)(_+_)
        ret.foreach(println)
    }

    /**
      * A[(K, V)].groupByKey(numPartitions)
      * 结果集的类型：[(K, Iterable[V])]
      *     sql中的groupBy
      * 需要注意一点的是，在以后工作中，尽量避免使用groupByKey算子，因为不想reduceByKey一样，会有本地预聚合，效率比较低
      * 还有其结果是把相同key所对应的value都拉取到了同一个分区中，如果数据量比较大，容易出现OOM。
      * 如果光从效率上说，我们可以使用比如reduceByKey，combineByKey来完成代替。
      */
    def groupByKeyOps(sc: SparkContext): Unit = {
        case class Student(id: Int, name: String, province: String)
        val stuRDD = sc.parallelize(List(
            Student(1, "刘博", "天津"),
            Student(2, "霍龙飞", "山西"),
            Student(3, "付云瑾", "山东"),
            Student(4, "何浩", "湖南"),
            Student(10086, "刘武", "湖南"),
            Student(10087, "成思远", "山西"),
            Student(10089, "小岚岚", "山东")
        ))

        val province2Infos: RDD[(String, Iterable[Student])] = stuRDD.map(stu => (stu.province, stu)).groupByKey()

        province2Infos.foreach{case (province, stus) => {
            println(s"${province}对应的学生有：${stus.toList}")
        }}
    }

    /**
      * coalesce(numPartitions: Int, shuffle:Boolean = false)
      *     以为合并，可以理解为重分区的含义
      *     比如原先有10个分区，我合并之后为5个；或者原先有5个，重分区之后又10个。
      * 默认情况下，coalesce算子是一个窄依赖算子，因为默认的参数shuffle为false，这个时候分区只能减少，不能增加。
      * 但是一个比较极端的情况就是分区之后的个数1，这个时候可能会造成数据的不完整，所此时建议将shuffle设置为true。
      * 如果要想增大分区，此时shuffle就必须要设置为true，此时的算子为宽依赖算子
      *     就可以使用repartition算子来代替coalesce算子
      */
    def repartitionOps(sc: SparkContext): Unit = {
        var rdd = sc.parallelize(1 to 10, 4)
        println("起始rdd的分区个数：" + rdd.getNumPartitions)
        //重分区 较少
        rdd = rdd.coalesce(2)
        println("减少分区之后的rdd分区个数： " + rdd.getNumPartitions)

        //增大分区

        rdd = rdd.repartition(6)
        println("增加分区之后的rdd分区个数： " + rdd.getNumPartitions)
        rdd.count()
    }

    /**
      * A.join(B)
      *     相当于sql中的join
      *        A a xxx join B b on a.id = b.aid
      *  因为需要通过外键进行关联，所以在spark中的rdd需要被转化为[K, V]才可以进行关联
      * 内连接
      *     A a inner join B b on a.id = b.aid
      *     A a, B b where a.id = b.aid
      *     返回左右两张表中的交集
      * 外连接
      *     A a xxx outer join B b on a.id = b.aid
      *     左外连接
      *         A a left outer join B b on a.id = b.aid
      *         返回左表所有，返回右表能够关联上的数据，关联不上的数据显示为null
      *     右外连接
      *         A a right outer join B b on a.id = b.aid
      *         返回右表所有，左边能够关联上的显示，关联不上的显示为null
      *
      *     共同的特点，被关联表的返回结果，可能有，可能没有
      * 全连接
      *     A a full outer join B b on a.id = b.aid
      *     就相当于 = 左外连接 + 右外连接
      *
      */
    def joinOps(sc: SparkContext): Unit = {
        case class Student(id: Int, name: String, age: Int)
        case class Score(sid: Int, course: String, score: Float)

        val stuRDD: RDD[Student] = sc.parallelize(List(
            Student(1, "刘博", 15),
            Student(2, "霍龙飞", 16),
            Student(3, "付云瑾", 17),
            Student(4, "何浩", 18),
            Student(10086, "刘武", 28)
        ))
        val scoreRDD:RDD[Score] = sc.parallelize(List(
            Score(1, "语文", 70.5f),
            Score(2, "数学", 80.5f),
            Score(3, "英语", 30.5f),
            Score(4, "体育", 99f),
            Score(10010, "语文", 70.5f)
        ))

        val id2Stu:RDD[(Int, Student)] = stuRDD.map(stu => (stu.id, stu))
        val id2Score:RDD[(Int, Score)] = scoreRDD.map(score => (score.sid, score))

        println("==student和score表进行inner join返回交集==")
        val info:RDD[(Int, (Student, Score))] = id2Stu.join(id2Score)
//        info.foreach(t => {
//            println(s"id: ${t._1}, student: ${t._2._1}, score: ${t._2._2}")
//        })
        info.foreach{case (id, (stu, score)) => {
            println(s"id: ${id}, student: ${stu}, score: ${score}")
        }}
        println("==student和score表进行left outer join返回交集==")
        val leftJoin:RDD[(Int, (Student, Option[Score]))] = id2Stu.leftOuterJoin(id2Score)
        leftJoin.foreach{case (id, (stu, scoreOption)) => {
            println(s"id: ${id}, student: ${stu}, score: ${scoreOption.getOrElse("UnKnow")}")
        }}
        println("==student和score表进行right outer join返回交集==")
        val rightJoin:RDD[(Int, (Option[Student], Score))] = id2Stu.rightOuterJoin(id2Score)
        rightJoin.foreach{case (id, (stuOption, score)) => {
            println(s"id: ${id}, student: ${stuOption.getOrElse("UnKnow")}, score: ${score}")
        }}
        println("==student和score表进行full outer join返回交集==")
        val fullJoin:RDD[(Int, (Option[Student], Option[Score]))] = id2Stu.fullOuterJoin(id2Score)
        fullJoin.foreach{case (id, (stuOption, scoreOption)) => {
            println(s"id: ${id}, student: ${stuOption.getOrElse("UnKnow")}, score: ${scoreOption.getOrElse("UnKnow")}")
        }}
    }

    /**
      * 就是sql中的distinct去重操作
      *     是一个shuffle操作，是一个宽依赖算子
      */
    def distinctOps(sc: SparkContext): Unit = {
        var rdd = sc.parallelize(List(
            1, 3, 2, 9, 3, 4, 6, 1
        ), 2)
        rdd = rdd.mapPartitionsWithIndex((index, partition) => {
            val list = partition.toList
            println(s"rdd中partition<${index}>中的数据为：" + list.mkString("[", ", ", "]"))
            list.map(_ * 1).toIterator
        })

        rdd.distinct().mapPartitionsWithIndex((index, partition) => {
            val list = partition.toList
            println(s"去重之后的rdd中partition<${index}>中的数据为：" + list.mkString("[", ", ", "]"))
            list.map(_ * 1).toIterator
        }).count
    }
    /*
        rdd.union(other)
            将两个rdd进行联合，相当于sql中的union all
          如果rdd的分区个数是A，other的分区个数是B，union之后的RDD的分区个数就是A+B
     */
    def unionOps(sc: SparkContext): Unit = {
        var listRDD1 = sc.parallelize(1 to 10, 2)
        var listRDD2 = sc.parallelize(6 to 12, 2)
        listRDD1 = listRDD1.mapPartitionsWithIndex((index, partition) => {
            val list = partition.toList
            println(s"listRDD1中partition<${index}>中的数据为：" + list.mkString("[", ", ", "]"))
            list.map(_ * 2).toIterator
        })

        listRDD2 = listRDD2.mapPartitionsWithIndex((index, partition) => {
            val list = partition.toList
            println(s"listRDD2中partition<${index}>中的数据为：" + list.mkString("[", ", ", "]"))
            list.map(_ * 2).toIterator
        })

        //联合操作
        var ret = listRDD1.union(listRDD2)

        ret = ret.mapPartitionsWithIndex((index, partition) => {
            val list = partition.toList
            println(s"ret中partition<${index}>中的数据为：" + list.mkString("[", ", ", "]"))
            list.map(_ / 2).toIterator
        })

        ret.count()
    }

    /**
      * sample：抽样
      *  sample(withReplacement: Boolean, fraction: Double, seed: Long)
      *  从rdd中抽取指定比例的数据
      *  withReplacement:   是否有放回的抽样
      *         true        有放回
      *         false       无放回
      *  fraction       :   抽取的比例  [0, 1]
      *  seed           :   随机数种子，有默认值
      *  有啥用：
      *     主要就用这个算子来当整体不方便进行处理，比如发生数据倾斜的时候，就用sample评估整体，到底是哪个key引发的dataskew（数据倾斜）
      *  同时需要了解的是，该算子是非准确抽样算子，也就说，抽取的数据条数不是严格按照比例抽取的，会有上下浮动。
      */
    def sampleOps(sc: SparkContext): Unit = {
        val list = 1 to 100000

        val listRDD = sc.parallelize(list)

        var sampled = listRDD.sample(true, 0.01)
        println("有放回抽样的数据集：" + sampled.count())

        sampled = listRDD.sample(false, 0.01)
        println("无放回抽样的数据集：" + sampled.count())
    }

    /**
      * mapPartitions(p: Iterator[A] => Iterator[B])
      * mapPartitionWithIndex(partitionIndex: Int, p: Iterator[A] => Iterator[B])
      * 比mapPartition就多了一个分区的索引编号
      */
    def mapPartitionWithIndex(sc: SparkContext): Unit = {
        val list = 1 to 10
        val listRDD:RDD[Int] = sc.parallelize(list)

        val ret = listRDD.mapPartitionsWithIndex((index, partition) => {
            val list = partition.toList
            println(s"partition<${index}>中的数据为：" + list.mkString("[", ", ", "]"))
            list.map(_ * 2).toIterator
        })

        ret.foreach(println)
    }
    /**
      * map(p: A => B)
      * mapPartitions(p: Iterator[A] => Iterator[B])
      *     map每一次都只会处理rdd中的一条记录，mapPartitions每一次处理RDD的一个partition数据。
      * 就可以理解为mapPartitions是map的批量处理操作，效率要高于map，同时又因为一次性加载一个partition
      * 的数据，所以可能会存在内存不足导致OOM(OutOfMemory)的问题
      */
    def mapPartitionsOps(sc: SparkContext): Unit = {
        val list = 1 to 10
        val listRDD:RDD[Int] = sc.parallelize(list)

//        listRDD.map(_ * 2)
        val ret = listRDD.mapPartitions(partition => {
            val list = partition.toList
            println("partition: " + list.mkString("[", ", ", "]"))
            list.map(_ * 2).toIterator
        })

        ret.foreach(println)
    }
}
